package co.recloud.ariadne.model.plan;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import co.recloud.ariadne.model.logical.Column;
import co.recloud.ariadne.model.logical.Table;
import co.recloud.ariadne.model.logical.Transaction;
import co.recloud.ariadne.persistor.KeyValuePersistor;
import co.recloud.ariadne.persistor.KeyValuePersistorImpl;

public class Project extends Operator {
	/**
	 * 
	 */
	private static final long serialVersionUID = 8596570062554545114L;

	public Project(Transaction transaction) {
		super(transaction);
		// TODO Auto-generated constructor stub
	}

	private Set<Column> columns;

	/**
	 * @param columns
	 *            the columns to set
	 */
	public void setColumns(Set<Column> columns) {
		this.columns = columns;
	}

	/**
	 * @return the columns
	 */
	public Set<Column> getColumns() {
		return columns;
	}

	public String toString() {
		return "Project: " + columns + " to result set "
				+ this.getTable().getName() + this.getChildren() + "\n";
	}

	public void execute(String localhost, int port) {
		this.getChildren().get(0).execute(localhost, port);
		KeyValuePersistor kv = new KeyValuePersistorImpl(this.getTransaction());
		Table childTable = this.getChildren().get(0).getTable();
		Table resultTable = this.getTable();
		Set<String> keys = kv.getAllKeys(childTable);
		Set<Column> copyColumns = columns;
		columns = new HashSet<Column>();
		columns.addAll(copyColumns);
		Map<String, Column> fieldToColumn = new HashMap<String, Column>();
		Set<Column> searchColumns = new HashSet<Column>();
		for(Column column : columns) {
			String field;
			if(column.getField().split("\\.").length > 1) {
				field = column.getField();
			} else {
				field = column.getTable().getAlias() + "." + column.getField();
			}
			Column searchColumn = new Column(childTable, field);
			fieldToColumn.put(field, column);
			searchColumns.add(searchColumn);
		}
		if (keys != null) {
			for (String key : keys) {
				Map<String, Object> columnMap = kv.getColumns(key,
						searchColumns);
				for (String columnString : columnMap.keySet()) {
					Column column = fieldToColumn.get(columnString);
					Column resultColumn = new Column(resultTable, column
							.getTable().getAlias() + "." + column.getField());
					String value = (String) columnMap.get(columnString);
					kv.setValue(key, resultColumn, value);
				}
				Column pkColumn = Column.parseColumn(Table.PRIMARY_KEY,
						childTable, this.getTableAliases());
				if (columns.contains(pkColumn)) {
					kv.setValue(
							key,
							Column.parseColumn(Table.PRIMARY_KEY,
									this.getTable(), this.getTableAliases()),
							key);
				}
			}
		}
		if (childTable.isResultSet()) {
			kv.dropTable(childTable);
		}
	}
}
